Skip to content

Fix unbounded growth of internal stream state in timeout handling#150

Merged
Watson1978 merged 1 commit into
fluent-plugins-nursery:masterfrom
HolyGrail:fix-stale-stream-state-leak
Jun 12, 2026
Merged

Fix unbounded growth of internal stream state in timeout handling#150
Watson1978 merged 1 commit into
fluent-plugins-nursery:masterfrom
HolyGrail:fix-stale-stream-state-leak

Conversation

@HolyGrail

Copy link
Copy Markdown
Contributor

Problem

ConcatFilter keeps per-stream state in three hashes (@buffer, @buffer_size, @timeout_map), but entries are never removed at runtime once a stream finishes:

  • flush_buffer resets the entry instead of deleting it, leaving @buffer[stream_identity] = [] / @buffer_size[stream_identity] = 0 behind.
  • flush_timeout_buffer skips expired streams whose buffer is empty (next if @buffer[stream_identity].empty?) and removes only the identities it actually flushed from @timeout_map, so streams that completed normally stay in @timeout_map forever.
  • Nothing else deletes keys from @buffer / @buffer_size (flush_remaining_buffer only runs at shutdown).

As a result, every stream identity ever seen leaks three hash entries plus the identity string.

This is especially harmful in use_partial_metadata mode: the stream identity is "#{tag}:#{partial_id}", and Docker generates a unique partial_id per split message, so a long-running fluentd accumulates state for every split (>16KB) log message it has ever processed.

The impact is twofold:

  • Memory grows without bound.
  • CPU also grows: flush_timeout_buffer runs every second and iterates the whole ever-growing @timeout_map while holding @timeout_map_mutex, which process (the ingest path) also needs. Under high traffic this eventually stalls ingestion.

Fix

In flush_timeout_buffer, collect all expired stream identities (not just the flushed ones) and purge them from @timeout_map, @buffer, and @buffer_size.

In-flight streams within flush_interval are untouched. Expired streams that still had buffered content are flushed exactly as before — the only behavioral change is that their (empty) state entries are now removed as well. If the same stream identity appears again later, the entries are transparently recreated by the existing Hash.new default blocks, so per-stream behavior is unchanged.

This also replaces the reject! { include? } scan (O(map size × flushed)) with direct deletes.

Tests

Added a stale stream state cleanup sub test case:

  • purges state for streams which completed normally (the leak above)
  • purges state for streams flushed by timeout
  • keeps state for in-flight streams within flush_interval

The first two fail against the current master.

Note on how the tests advance time

While writing these tests I noticed that Fluent::Test.setup memoizes Fluent::Engine.now, so the clock is effectively frozen during driver runs and now - previous_timestamp is always 0 — the timeout condition can never become true with real sleeps. (The existing timeout tests pass because flush_remaining_buffer at shutdown emits the same error event the mocks expect, i.e. the timer-driven timeout path was not actually exercised.)

The new tests therefore advance the frozen clock via the Fluent::Engine.now= setter provided by fluent/test and let the already-running 1s periodic timer evaluate the expiry, restoring the clock in teardown.

Reproduction snippet

# Feed N complete partial-metadata sequences, then:
filter.instance_variable_get(:@timeout_map).size  #=> N (grows forever)
filter.instance_variable_get(:@buffer).size       #=> N (empty arrays)

🤖 Generated with Claude Code

flush_buffer leaves an empty entry behind in @buffer/@buffer_size, and
flush_timeout_buffer skips expired streams whose buffer is empty without
removing them from @timeout_map. Streams which complete normally
therefore leave entries in all three hashes forever.

This is especially harmful in partial metadata mode: every split
message has a unique partial_id and thus a unique stream identity, so a
long-running fluentd accumulates state for every split message it has
ever processed. Memory grows without bound, and the periodic
flush_timeout_buffer scan (which iterates the whole @timeout_map every
second while holding @timeout_map_mutex) consumes more and more CPU and
increasingly delays filter_stream.

Purge expired streams from @timeout_map, @buffer and @buffer_size
whether or not they still have buffered content. In-flight streams
within flush_interval are untouched.

The new tests advance the frozen Fluent::Engine.now (Fluent::Test.setup
memoizes it) instead of sleeping, because the expiry condition can
never become true under the frozen clock; this is also why existing
timeout tests only exercised the shutdown path.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@HolyGrail HolyGrail marked this pull request as ready for review June 11, 2026 09:44
@Watson1978

Copy link
Copy Markdown
Contributor

Thanks. I tried to reproduce.

Config

<source>
  @type forward
  port 24224
</source>

<filter test.**>
  @type concat
  key message
  use_partial_metadata true
  flush_interval 1s
</filter>

<match test.**>
  @type null
</match>

forwarder script

require "bundler/inline"
gemfile do
  source "https://rubygems.org"
  gem "fluent-logger"
end

require 'securerandom'

log = Fluent::Logger::FluentLogger.new('test', host: 'localhost', port: 24224)

loop do
  10000.times do
    partial_id = SecureRandom.uuid

    log.post("concat", {
      "message"         => "part1",
      "partial_message" => "true",
      "partial_id"      => partial_id,
      "partial_ordinal" => "1",
      "partial_last"    => "false"
    })

    log.post("concat", {
      "message"         => "part2",
      "partial_message" => "true",
      "partial_id"      => partial_id,
      "partial_ordinal" => "2",
      "partial_last"    => "true"
    })
  end

  sleep 0.1
end

result

chart

@Watson1978 Watson1978 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

@Watson1978 Watson1978 merged commit 1176f29 into fluent-plugins-nursery:master Jun 12, 2026
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants